package www.larkmidtable.com;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import www.larkmidtable.com.channel.Channel;
import www.larkmidtable.com.reader.AbstractDBReader;
import www.larkmidtable.com.reader.Reader;
import www.larkmidtable.com.util.DBType;
import www.larkmidtable.com.util.DBUtil;

import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;

/**
 *
 * @author yanze.he
 * @Date: 2023/05/28 22:01
 * @Description:
 **/
public class PgReader extends AbstractDBReader {

	private Connection connection ;
//	private PreparedStatement statement ;
	private static Logger logger = LoggerFactory.getLogger(PgReader.class);
	@Override
	public void open() {
		try {
			logger.info("PostgreSQL的Reader建立连接开始....");
			Class.forName(DBType.PostgreSql.getDriverClass());
			connection = DriverManager
					.getConnection(configBean.getUrl(), configBean.getUsername(), configBean.getPassword());
			logger.info("PostgreSQL的Reader建立连接结束....");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public Queue<List<String>> startRead(String[] inputSplits) {
		logger.info("PostgreSQL读取数据操作....");
		long startTime = System.currentTimeMillis();
		try {
			if (inputSplits.length > 1) {
				// 开启多线程读
				batchStartRead(connection, inputSplits);
			} else {
				defaultSingleStartRead(connection, inputSplits[0]);
			}
		}catch (Exception e){
			e.printStackTrace();
		}
		long endTime = System.currentTimeMillis();
		logger.info("PostgreSQL读取数据结束....耗时：" + (endTime - startTime) + "ms");
		return Channel.getQueue();
	}

	@Override
	public Queue<List<String>> startRead(String inputSplit) {
		//TODO 实现方法
		logger.info("PostgreSQL读取数据操作....");
		try {
			System.out.println(inputSplit);
			defaultSingleStartRead(connection, inputSplit);
		} catch (Exception e) {
			e.printStackTrace();
		}
		logger.info("PostgreSQL读取数据结束....");
		return Channel.getQueue();
	}


	@Override
	public String[] createInputSplits() {
		logger.info("PostgreSQL的Reader开始进行分片开始....");
		String inputSql = String.format("select %s from %s",configBean.getColumn(), configBean.getTable());
		List<String> results = defaultInputSplits(configBean.getColumn(),inputSql);
		logger.info("PostgreSQL的Reader开始进行分片结束....");
		String[] array = new String[results.size()];
		return results.toArray(array);
	}

	/**
	 * pg分页方法
	 * @param column
	 * @param originInput
	 * @return
	 */
	public List<String> defaultInputSplits(String column,String originInput) {
		List<String> splits = new ArrayList<>();
		int count = count();
		if (count > 0 && 1 == 1) {// 1==1 后续可开启切分SQL配置参数
			// 拆分的大小
			// 拆分的大小
			int size = this.getConfigBean().getThread();
			Integer limitSize = DEFAULT_BATCH_SIZE;
			Integer lastLimit = DEFAULT_BATCH_SIZE;
			for (int i = 0; i < size; i++) {
				limitSize = count / configBean.getThread();
				if (i == size - 1) {
					lastLimit = count / configBean.getThread() + count % configBean.getThread();
				}

				StringBuilder builder = new StringBuilder("SELECT " + column + " FROM ( ");
				builder.append(" ").append(originInput).append(" ) t").append(" ").append("OFFSET");
				int limitStart = i * limitSize;
				builder.append(" ").append(limitStart).append(" LIMIT ").append(i == size - 1 ? lastLimit : limitSize);
				splits.add(builder.toString());
			}

		} else {
			splits.add(originInput);
		}
		return splits;
	}

	@Override
	public void close()  {
		try {
			logger.info("PostgreSQL的Reader开始进行关闭连接开始....");
			connection.close();
			logger.info("PostgreSQL的Reader开始进行关闭连接结束....");
		} catch (SQLException e) {
			e.printStackTrace();
		}
	}

	@Override
	public int count() {
		PreparedStatement preparedStatement = null;
		try {
			preparedStatement =
					connection.prepareStatement("SELECT count(*) FROM " + configBean.getTable());
			ResultSet resultSet = preparedStatement.executeQuery();
			resultSet.next();
			return resultSet.getInt(1);
		} catch (Exception e) {
			logger.error(e.getMessage());
		} finally {
			if (preparedStatement != null) {
				try {
					preparedStatement.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
		}
		return 0;
	}
}
